package com.imooc.spark

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by zghgchao 2017/11/15 13:00
  * updateStateByKey算子
  * 需求：统计到目前为止累积出现的单词的个数(需要保持住以前的状态)
  * 需求：将统计结果写入到MySQL
  */
object ForeachRDDApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[2]")
    /**
      * 创建StreamingContext需要两个参数：SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("172.17.66.107", 9999)

    val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    result.print()

    //TODO 讲结果写到MySQL
    /*result.foreachRDD(rdd => {
      val connection = createConnection()
      rdd.foreach { record =>
        val sql = "insert into wordcount(word,wordcount) values('" + record._1 + "'," + record._2 + ")"
        connection.createStatement().execute(sql)
      }
    })*/

    result.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createConnection()
        partitionOfRecords.foreach(record => {
          val sql = "insert into wordcount(word,wordcount) values('" + record._1 + "'," + record._2 + ")"
          
          connection.createStatement().execute(sql)
        })
        connection.close()
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }

  def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://172.17.66.107:3306/imooc_spark", "root", "root")
  }
}
